-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(bigquery/storage/managedwriter): append improvements #5465
Conversation
Fixes: googleapis#5459 This PR addresses two issues. The receiver channel for processing asyncronous updates is switched to a buffer channel, based on the allowed append depth. The second change is that this allows for better context expiry/cancellation when invoking AppendRows on a managed stream. This also improves testing with some test refactors, as well as shaking out some timing issues due the larger queue depth.
return nil, err | ||
// Call the underlying append. The stream has it's own retained context and will surface expiry on | ||
// it's own, but we also need to respect any deadline for the provided context. | ||
errCh := make(chan error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Not sure if you import or use it already but this looks like a great usecase for: errorgroup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This particular use case is handling a single request on a bidi stream and supporting cancellation, but it's certainly something we may end up with if we end up with write multiplexing.
// it's own, but we also need to respect any deadline for the provided context. | ||
errCh := make(chan error) | ||
var appendErr error | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is the potential here that append
will block waiting to add a pending write to the pending writes channel. When that happens the context below could expire returning an error from AppendRows
. However this goroutine is still running and as soon as the pending channel is freed the append
will in fact succeed even if an error was returned.
Furthermore this goroutine not being stopped when the caller context expires means that if append
becomes slow due a full pending write channel, rapid calls to AppendRows
, even with a short timeouts, could result in a very large number of gorountines left running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I was reasoning through backend behaviors, didn't think through this scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spend some time refactoring this, as there were other correctness issues as well primarily around potential races.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, added a test explicitly to watch for goroutine leak.
} | ||
if err == nil { | ||
// Compute numRows, once we pass ownership to the channel the request may be | ||
// cleared. | ||
numRows := int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows())) | ||
ch <- pw |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this guard against the otherCtx
expiring as well?
Something like
select {
case ch <- pw:
// We've passed ownership of the pending write to the channel.
// It's now responsible for marking the request done, we're done
// with the critical section.
ms.mu.Unlock()
// Record stats and return.
recordStat(ms.ctx, AppendRequests, 1)
recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize))
recordStat(ms.ctx, AppendRequestRows, numRows)
return nil
case <-otherCtx.Done():
ms.mu.Unlock()
return otherCtx.Err()
}
That way the call to append
is guaranteed not to lock if ch
is full and it is guaranteed to return within the given context timeout (within some margins).
And once calls to append
are guaranteed to return within the timeout the goroutine in AppendRows
could probably be removed and replaced with a synchronous call I believe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is that the select allows the writer to get out of sync processing append responses, which arrive in the same order as append requests were accepted. We use the channel for maintaining the ordering when processing the responses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood. Thanks.
Fixes: #5459
This PR addresses two issues. The receiver channel for processing
asyncronous updates is switched to a buffer channel, based on the
allowed append depth.
The second change is that this allows for better context
expiry/cancellation when invoking AppendRows on a managed stream.
This also improves testing with some test refactors, as well as
shaking out some timing issues due the larger queue depth.